{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "757a1e05-6aaf-44f7-9a8e-63b9bdf5917c",
   "metadata": {},
   "source": [
    "## Description\n",
    "This notebook demonstrates the usage of Unstructured Data Ingestion APIs. \n",
    "\n",
    "## Usage Instructions\n",
    "Run each cell sequentially to execute the notebook.\n",
    "Note some cells are for reference and in order to not accidently excute them, they are marked as \"Markdown\"."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c128728d-dfac-44ee-80dd-d2bcbc9e9963",
   "metadata": {},
   "outputs": [],
   "source": [
    "IPADDRESS = \"localhost\" #Replace this with the correct IP address\n",
    "UNSTRUCTURED_DATA_PORT = \"8086\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "aef68f73-60c3-4be5-98e1-05cd8841ca26",
   "metadata": {},
   "source": [
    "## Document Ingestion\n",
    "#### Get health of the document ingest service"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f6e206ac-97e8-4f72-9045-d9bcb84642ae",
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/health'\n",
    "print(url)\n",
    "headers = {\n",
    "    'accept': 'application/json'\n",
    "}\n",
    "\n",
    "response = requests.get(url, headers=headers)\n",
    "\n",
    "# Print the response\n",
    "print(response.status_code)\n",
    "print(response.json())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8768d51b-1c7a-443e-b600-b29235cae1a3",
   "metadata": {},
   "source": [
    "#### Ingest Manuals (pdf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0420fee4-b652-4c5b-b82f-9e113f484e96",
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "import os\n",
    "# URL of the API endpoint\n",
    "url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'\n",
    "# Path to the PDF file you want to upload\n",
    "directory_path = '../data/manuals_pdf'\n",
    "\n",
    "# Loop through all files in the directory\n",
    "for filename in os.listdir(directory_path):\n",
    "    # Check if the file is a PDF\n",
    "    if filename.endswith('.pdf'):\n",
    "        file_path = os.path.join(directory_path, filename)\n",
    "\n",
    "        # Open the file in binary mode and send it in a POST request\n",
    "        with open(file_path, 'rb') as file:\n",
    "            files = {'file': file}\n",
    "            response = requests.post(url, files=files)\n",
    "\n",
    "        # Print the response from the server\n",
    "        print(f'Uploaded {filename}: {response.status_code}')\n",
    "        print(response.json())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "960c93b5-f2d1-4e06-838d-1b37bc50eb86",
   "metadata": {},
   "source": [
    "#### Ingest FAQs (pdf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "46ad44c7-1f4b-4b1c-8d42-b629b81de394",
   "metadata": {},
   "outputs": [],
   "source": [
    "# URL of the API endpoint\n",
    "import requests\n",
    "url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'\n",
    "# Open the file in binary mode and send it in a POST request\n",
    "filename = \"../data/FAQ.pdf\"\n",
    "with open(filename, 'rb') as file:\n",
    "    files = {'file': file}\n",
    "    response = requests.post(url, files=files)\n",
    "\n",
    "# Print the response from the server\n",
    "print(f'Uploaded {filename}: {response.status_code}')\n",
    "print(response.json())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2a14e119-26a3-434b-9ff1-c0427f371d09",
   "metadata": {},
   "source": [
    "#### Get the list of documents"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "072cfc02-ed4c-4a20-8755-7abbd4a14ff4",
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "# URL of the API endpoint\n",
    "url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'\n",
    "\n",
    "# Send the GET request\n",
    "response = requests.get(url)\n",
    "\n",
    "# Print the response from the server\n",
    "print(f'Response Status Code: {response.status_code}')\n",
    "#print(response.json())\n",
    "\n",
    "# Check if the request was successful\n",
    "if response.status_code == 200:\n",
    "    data = response.json()\n",
    "    documents = data.get('documents', [])\n",
    "\n",
    "    # Format and print the list of documents\n",
    "    print(\"Available Documents:\")\n",
    "    for idx, document in enumerate(documents, start=1):\n",
    "        print(f\"{idx}. {document}\")\n",
    "else:\n",
    "    print(f\"Failed to retrieve documents. Status Code: {response.status_code}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "491db31b-9496-4dd6-b8bf-550f6d05bc48",
   "metadata": {},
   "source": [
    "## Ingesting Product information from gear-store.csv\n",
    "\n",
    "Since the data is in csv file, but we support txt file for unstructured data ingestion. We will convert data into multiple text files and ingest them."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e0633d53-579f-4c39-8c53-5978c2a4e9fa",
   "metadata": {},
   "source": [
    "### Display Data in csv file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "57188764-3fe5-4ee5-b9ec-b65c10f295cd",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture output\n",
    "! pip install pandas\n",
    "! pip install psycopg2-binary"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bea9c0cc-a836-4103-b6b8-4b641133d298",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "\n",
    "# Read the CSV file\n",
    "df = pd.read_csv('../data/gear-store.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4f23cf29-e73e-4070-bfd0-93c451b9a63e",
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.display import display\n",
    "display(df.head())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2796fc82-1621-4220-8cf1-62243007027b",
   "metadata": {},
   "outputs": [],
   "source": [
    "len(df)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c1e88f91-a7e7-4975-83bb-c15cf1fd73df",
   "metadata": {},
   "source": [
    "### Create *.txt file from csv data to ingest in canonical RAG"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "51029daa-1402-483c-b5cb-8c1d8a53a391",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import re\n",
    "\n",
    "# Function to create a valid filename\n",
    "def create_valid_filename(s):\n",
    "    # Remove invalid characters and replace spaces with underscores\n",
    "    s = re.sub(r'[^\\w\\-_\\. ]', '', s)\n",
    "    return s.replace(' ', '_')\n",
    "\n",
    "# Create the directory if it doesn't exist\n",
    "os.makedirs('../data/product', exist_ok=True)\n",
    "\n",
    "# Iterate through each row in the DataFrame\n",
    "for index, row in df.iterrows():\n",
    "    # Create filename using name, category, and subcategory\n",
    "    filename = f\"{create_valid_filename(row['name'])}_{create_valid_filename(row['category'])}_{create_valid_filename(row['subcategory'])}.txt\"\n",
    "\n",
    "    print(f\"Creating file {filename}, current index {index}\")\n",
    "    # Full path for the file\n",
    "    filepath = os.path.join('../data/product', filename)\n",
    "\n",
    "    # Create the content for the file\n",
    "    content = f\"Name: {row['name']}\\n\"\n",
    "    content += f\"Category: {row['category']}\\n\"\n",
    "    content += f\"Subcategory: {row['subcategory']}\\n\"\n",
    "    content += f\"Price: ${row['price']}\\n\"\n",
    "    content += f\"Description: {row['description']}\\n\"\n",
    "\n",
    "    # Write the content to the file\n",
    "    with open(filepath, 'w', encoding='utf-8') as file:\n",
    "        file.write(content)\n",
    "\n",
    "print(f\"Created {len(df)} files in ../data/product\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "49107597-2c46-4414-ab2d-5ea4971757cb",
   "metadata": {},
   "source": [
    "### Ingest data from newly created text file in canonical RAG"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "32129b1f-a4ee-4a1c-b3a4-f00a233de1bc",
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "import os\n",
    "\n",
    "# Helper function to ingest document in canonical RAG retriever\n",
    "def ingest_file(filepath: str) -> bool:\n",
    "    \"\"\"\n",
    "    Ingest file in canonical RAG retriever\n",
    "\n",
    "    Args:\n",
    "        filepath: Path to the file to be ingested in retreiver\n",
    "\n",
    "    Returns:\n",
    "        bool: Status of file ingestion\n",
    "    \"\"\"\n",
    "    # URL of the API endpoint\n",
    "    url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'\n",
    "\n",
    "    # Open the file in binary mode and send it in a POST request\n",
    "    with open(filepath, 'rb') as file:\n",
    "        files = {'file': file}\n",
    "        try:\n",
    "            response = requests.post(url, files=files)\n",
    "            return response.status_code == 200\n",
    "        except requests.exceptions.RequestException as e:\n",
    "            print(f\"Request failed for {filepath}: {e}\")\n",
    "            return False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "490681fa-fab9-488a-aedc-520c89cacd0f",
   "metadata": {},
   "outputs": [],
   "source": [
    "from concurrent.futures import ThreadPoolExecutor, as_completed\n",
    "\n",
    "directory_path = '../data/product'\n",
    "max_workers = 5  # Adjust this based on your system's capabilities and API limits\n",
    "\n",
    "filepaths = [os.path.join(directory_path, filename) for filename in os.listdir(directory_path) if filename.endswith(\".txt\")]\n",
    "filepaths\n",
    "\n",
    "successfully_ingested = []\n",
    "failed_ingestion = []\n",
    "\n",
    "with ThreadPoolExecutor(max_workers=max_workers) as executor:\n",
    "    future_to_file = {executor.submit(ingest_file, filepath): filepath for filepath in filepaths}\n",
    "\n",
    "    for future in as_completed(future_to_file):\n",
    "        filepath = future_to_file[future]\n",
    "        try:\n",
    "            if future.result():\n",
    "                print(f\"Successfully Ingested {os.path.basename(filepath)}\")\n",
    "                successfully_ingested.append(filepath)\n",
    "            else:\n",
    "                print(f\"Failed to Ingest {os.path.basename(filepath)}\")\n",
    "                failed_ingestion.append(filepath)\n",
    "        except Exception as e:\n",
    "            print(f\"Exception occurred while ingesting {os.path.basename(filepath)}: {e}\")\n",
    "            # traceback.print_exc()\n",
    "            failed_ingestion.append(filepath)\n",
    "\n",
    "print(f\"Total files successfully ingested: {len(successfully_ingested)}\")\n",
    "print(f\"Total files failed ingestion: {len(failed_ingestion)}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4bad4d7a-59f6-4a37-ad5d-bcc40e36edee",
   "metadata": {},
   "source": [
    "#### (For reference) Delete a document\n",
    "\n",
    "The cell is in \"raw\" and does not execute. This code is for reference alone."
   ]
  },
  {
   "cell_type": "raw",
   "id": "925c5709-7ef9-4b88-b7c4-d63e7feb18e1",
   "metadata": {},
   "source": [
    "import requests\n",
    "\n",
    "# URL of the API endpoint\n",
    "url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'\n",
    "\n",
    "# Filename of the document to delete\n",
    "filename = 'GEFORCE_RTX_4070_SUPER_User_Guide_Rev1'\n",
    "\n",
    "# Parameters to be sent with the DELETE request\n",
    "params = {'filename': filename}\n",
    "\n",
    "# Send the DELETE request\n",
    "response = requests.delete(url, params=params)\n",
    "\n",
    "# Print the response from the server\n",
    "print(f'Response Status Code: {response.status_code}')\n",
    "print(response.json())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ccf25005-13c8-4cf5-9d80-f13f0f7b51d7",
   "metadata": {},
   "source": [
    "### Ingest the customer order history data into a postgres db"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dbe6a2ac-c013-4ab9-b7b0-47a11d5bafe9",
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "import csv\n",
    "import re\n",
    "import psycopg2\n",
    "from datetime import datetime\n",
    "\n",
    "# Database connection parameters\n",
    "db_params = {\n",
    "    'dbname': 'customer_data',\n",
    "    'user': 'postgres',\n",
    "    'password': 'password',\n",
    "    'host': IPADDRESS,  # e.g., 'localhost' or the IP address\n",
    "    'port': '5432'   # e.g., '5432'\n",
    "}\n",
    "\n",
    "# CSV file path\n",
    "csv_file_path = '../data/orders.csv'\n",
    "\n",
    "# Connect to the database\n",
    "conn = psycopg2.connect(**db_params)\n",
    "cur = conn.cursor()\n",
    "\n",
    "# Create the table if it doesn't exist\n",
    "create_table_query = '''\n",
    "CREATE TABLE IF NOT EXISTS customer_data (\n",
    "    customer_id INTEGER NOT NULL,\n",
    "    order_id INTEGER NOT NULL,\n",
    "    product_name VARCHAR(255) NOT NULL,\n",
    "    product_description VARCHAR NOT NULL,\n",
    "    order_date DATE NOT NULL,\n",
    "    quantity INTEGER NOT NULL,\n",
    "    order_amount DECIMAL(10, 2) NOT NULL,\n",
    "    order_status VARCHAR(50),\n",
    "    return_status VARCHAR(50),\n",
    "    return_start_date DATE,\n",
    "    return_received_date DATE,\n",
    "    return_completed_date DATE,\n",
    "    return_reason VARCHAR(255),\n",
    "    notes TEXT,\n",
    "    PRIMARY KEY (customer_id, order_id)\n",
    ");\n",
    "'''\n",
    "cur.execute(create_table_query)\n",
    "\n",
    "# Open the CSV file and insert data\n",
    "with open(csv_file_path, 'r') as f:\n",
    "    reader = csv.reader(f)\n",
    "    next(reader)  # Skip the header row\n",
    "\n",
    "    for row in reader:\n",
    "        # Access columns by index as per the provided structure\n",
    "        order_id = int(row[1])  # OrderID\n",
    "        customer_id = int(row[0])  # CID (Customer ID)\n",
    "\n",
    "        # Correcting the order date to include time\n",
    "        order_date = datetime.strptime(row[4], \"%Y-%m-%dT%H:%M:%S\")  # OrderDate with time\n",
    "\n",
    "        quantity = int(row[5])  # Quantity\n",
    "\n",
    "        # Handle optional date fields with time parsing\n",
    "        return_start_date = datetime.strptime(row[9], \"%Y-%m-%dT%H:%M:%S\") if row[9] else None  # ReturnStartDate\n",
    "        return_received_date = datetime.strptime(row[10],\"%Y-%m-%dT%H:%M:%S\") if row[10] else None  # ReturnReceivedDate\n",
    "        return_completed_date = datetime.strptime(row[11], \"%Y-%m-%dT%H:%M:%S\") if row[11] else None  # ReturnCompletedDate\n",
    "\n",
    "        # Clean product name\n",
    "        product_name = re.sub(r'[®™]', '', row[2])  # ProductName\n",
    "\n",
    "        product_description = re.sub(r'[®™]', '', row[3])\n",
    "        # OrderAmount as float\n",
    "        order_amount = float(row[6].replace(',', ''))\n",
    "\n",
    "        # Insert data into the database\n",
    "        cur.execute(\n",
    "            '''\n",
    "            INSERT INTO customer_data (\n",
    "                customer_id, order_id, product_name, product_description, order_date, quantity, order_amount,\n",
    "                order_status, return_status, return_start_date, return_received_date,\n",
    "                return_completed_date, return_reason, notes\n",
    "            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\n",
    "            ''',\n",
    "            (customer_id, order_id, product_name, product_description, order_date, quantity, order_amount,\n",
    "             row[7],  # OrderStatus\n",
    "             row[8],  # ReturnStatus\n",
    "             return_start_date, return_received_date, return_completed_date,\n",
    "             row[12],  # ReturnReason\n",
    "             row[13])  # Notes\n",
    "        )\n",
    "\n",
    "# Commit the changes and close the connection\n",
    "conn.commit()\n",
    "cur.close()\n",
    "conn.close()\n",
    "\n",
    "print(\"CSV Data imported successfully!\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "83f6c850-4083-48d3-bc3a-0b517a440027",
   "metadata": {},
   "source": [
    "#### Read the data to ensure it was written "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4ab0d010-2250-4489-af04-88e24c1241fa",
   "metadata": {},
   "outputs": [],
   "source": [
    "import psycopg2\n",
    "\n",
    "# Database connection parameters\n",
    "db_params = {\n",
    "    'dbname': 'customer_data',\n",
    "    'user': 'postgres',\n",
    "    'password': 'password',\n",
    "    'host': IPADDRESS,  # e.g., 'localhost' or the IP address\n",
    "    'port': '5432'   # e.g., '5432'\n",
    "}\n",
    "\n",
    "# Connect to the database\n",
    "conn = psycopg2.connect(**db_params)\n",
    "cur = conn.cursor()\n",
    "\n",
    "# Query to select the first 5 rows from the customer_data table\n",
    "query = 'SELECT * FROM customer_data LIMIT 5;'\n",
    "\n",
    "# Execute the query\n",
    "cur.execute(query)\n",
    "\n",
    "# Fetch the column headers\n",
    "colnames = [desc[0] for desc in cur.description]\n",
    "\n",
    "# Fetch the first 5 rows\n",
    "rows = cur.fetchall()\n",
    "\n",
    "# Print the headers and the corresponding rows\n",
    "for i, row in enumerate(rows, start=1):\n",
    "    print(f\"\\nRow {i}:\")\n",
    "    for header, value in zip(colnames, row):\n",
    "        print(f\"{header}: {value}\")\n",
    "\n",
    "# Close the connection\n",
    "cur.close()\n",
    "conn.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "95ce05ca-d926-44a5-970d-9a5d95cffd1f",
   "metadata": {},
   "source": [
    "#### (For reference)Drop the postgres table\n",
    "\n",
    "The cell is in raw format and does not execute. This code is for reference alone."
   ]
  },
  {
   "cell_type": "raw",
   "id": "08c07dfd-c056-44b3-aa3b-4e12114b1b10",
   "metadata": {
    "vscode": {
     "languageId": "raw"
    }
   },
   "source": [
    "# pip install psycopg2-binary\n",
    "import psycopg2\n",
    "\n",
    "# Database connection parameters\n",
    "db_params = {\n",
    "    'dbname': 'customer_data',\n",
    "    'user': 'postgres',\n",
    "    'password': 'password',\n",
    "    'host': IPADDRESS,  # e.g., 'localhost' or the IP address\n",
    "    'port': '5432'   # e.g., '5432'\n",
    "}\n",
    "\n",
    "# Connect to the database\n",
    "conn = psycopg2.connect(**db_params)\n",
    "cur = conn.cursor()\n",
    "\n",
    "# Drop the table if it exists\n",
    "drop_table_query = 'DROP TABLE IF EXISTS customer_data;'\n",
    "\n",
    "# Execute the drop query\n",
    "cur.execute(drop_table_query)\n",
    "\n",
    "# Commit the changes and close the connection\n",
    "conn.commit()\n",
    "cur.close()\n",
    "conn.close()\n",
    "\n",
    "print(\"Table 'customer_data' dropped successfully!\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
